from kafka import KafkaProducer
import json
import pandas as pd
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

def putData():
    # 读取 CSV 文件
    df = pd.read_csv('JDData.csv', dtype=str)

    # 创建 KafkaProducer 实例
    producer = KafkaProducer(bootstrap_servers='hadoop101:9092')

    # 将 DataFrame 中的每一行转换为字符串，并保持原始格式
    for index, row in df.iterrows():
        # 使用 join 将行中的每个元素用逗号连接成一个字符串
        message = ','.join(row)
        # 发送消息到 Kafka
        producer.send('JD', value=message.encode('utf-8'))

    # 确保所有消息都已发送
    producer.flush()



def createTpis():
    # Kafka 集群配置
    bootstrap_servers = 'hadoop101:9092'
    topic_name = 'JD'
    num_partitions = 1  # 主题的分区数
    replication_factor = 1  # 主题的副本因子

    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    # 创建 NewTopic 实例
    topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

    # 创建主题
    admin_client.create_topics(new_topics=[topic])

    # 关闭 KafkaAdminClient
    admin_client.close()

if __name__ == '__main__':
    # createTpis()
    putData()
